package com.bw;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.base.BaseApp;
import com.bw.gmall.realtime.common.base.BaseSqlApp;
import com.bw.gmall.realtime.common.bean.TrafficPageViewBean;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.funciton.DorisMapFunction;
import com.bw.gmall.realtime.common.util.DateFormatUtil;
import com.bw.gmall.realtime.common.util.FlinkSinkUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Iterator;

public class DwsTrafficVcChArIsNewPageViewWindow extends BaseApp {
    public static void main(String[] args) {
        new DwsTrafficVcChArIsNewPageViewWindow().start(Constant.TOPIC_DWD_TRAFFIC_PAGE,Constant.DWS_TRAFFIC_VC_CH_AR_IS_NEW_PAGE_VIEW_WINDOW,1,10022);
    }


    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
        // 1. ETL数据--->下游需要什么字段
        SingleOutputStreamOperator<JSONObject> etlStream = etl(dataStreamSource);

        // key 之前添加水印和之后添加之后
        // 2. 添加水位线，并keyby  mid
        // 3. 状态编程(转成实体对象)-->求UV和PV
        SingleOutputStreamOperator<TrafficPageViewBean> keyStream = keyBy(etlStream);

        SingleOutputStreamOperator<TrafficPageViewBean> watermarksStream = getWatermarksStream(keyStream);

        // 4. 进行key ,key的字段vc ch ar is_new
        // 5. reduce累计在一起
        SingleOutputStreamOperator<TrafficPageViewBean> reduceStream = getReduce(watermarksStream);

        // 6. 写出到doris api
//        reduceStream.print();
        reduceStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_TRAFFIC_VC_CH_AR_IS_NEW_PAGE_VIEW_WINDOW));

    }

    private SingleOutputStreamOperator<TrafficPageViewBean> getReduce(SingleOutputStreamOperator<TrafficPageViewBean> watermarksStream) {
        return watermarksStream.keyBy(new KeySelector<TrafficPageViewBean, String>() {
                    @Override
                    public String getKey(TrafficPageViewBean trafficPageViewBean) throws Exception {
                        return trafficPageViewBean.getVc() + ":" + trafficPageViewBean.getCh() + ":" + trafficPageViewBean.getAr() + ":" + trafficPageViewBean.getIsNew();
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<TrafficPageViewBean>() {
                    @Override
                    public TrafficPageViewBean reduce(TrafficPageViewBean value1, TrafficPageViewBean value2) throws Exception {
                        // 滑动就需要New了
                        value1.setPvCt(value1.getPvCt() + value2.getPvCt());
                        value1.setUvCt(value1.getUvCt() + value2.getUvCt());
                        value1.setSvCt(value1.getSvCt() + value2.getSvCt());
                        value1.setDurSum(value1.getDurSum() + value2.getDurSum());
                        return value1;
                    }
                }, new WindowFunction<TrafficPageViewBean, TrafficPageViewBean, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow timeWindow, Iterable<TrafficPageViewBean> iterable, Collector<TrafficPageViewBean> collector) throws Exception {
                        long start = timeWindow.getStart();
                        long end = timeWindow.getEnd();
                        Iterator<TrafficPageViewBean> iterator = iterable.iterator();
                        TrafficPageViewBean trafficPageViewBean = iterator.next();
                        trafficPageViewBean.setStt(DateFormatUtil.tsToDateTime(start));
                        trafficPageViewBean.setEdt(DateFormatUtil.tsToDateTime(end));
                        // 今天
                        trafficPageViewBean.setCur_date(DateFormatUtil.tsToDate(System.currentTimeMillis()));
                        collector.collect(trafficPageViewBean);
                    }
                });
    }

    private SingleOutputStreamOperator<TrafficPageViewBean> getWatermarksStream(SingleOutputStreamOperator<TrafficPageViewBean> keyStream) {
        return keyStream.assignTimestampsAndWatermarks(WatermarkStrategy.<TrafficPageViewBean>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<TrafficPageViewBean>() {
            @Override
            public long extractTimestamp(TrafficPageViewBean trafficPageViewBean, long l) {
                return trafficPageViewBean.getTs();
            }
        }));
    }

    private SingleOutputStreamOperator<TrafficPageViewBean> keyBy(SingleOutputStreamOperator<JSONObject> etlStream){
        return etlStream.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject jsonObject) throws Exception {
                return jsonObject.getJSONObject("common").getString("mid");
            }
        }).process(new ProcessFunction<JSONObject, TrafficPageViewBean>() {
            private ValueState<String> lastDtState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("last_dt", String.class);
                valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(24))
                        // 默认更新方式---OnCreateAndWrite
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());

                lastDtState = getRuntimeContext().getState(valueStateDescriptor);
            }

            @Override
            public void processElement(JSONObject jsonObject, ProcessFunction<JSONObject, TrafficPageViewBean>.Context context, Collector<TrafficPageViewBean> collector) throws Exception {
                Long ts = jsonObject.getLong("ts");
                String curDt = DateFormatUtil.tsToDate(ts);
                JSONObject common = jsonObject.getJSONObject("common");
                JSONObject page = jsonObject.getJSONObject("page");
                String lastValue = lastDtState.value();

                long uvCt = 0L;
                long svCt = 0L;
                if (!curDt.equals(lastValue)) {
                    // 独立
                    uvCt = 1L;
                    lastDtState.update(curDt);
                }

                String last_page_id = page.getString("last_page_id");
                if (last_page_id == null) {
                    // 会话数
                    svCt = 1L;
                }

                collector.collect(TrafficPageViewBean.builder()
                        .ar(common.getString("ar"))
                        .ch(common.getString("ch"))
                        .vc(common.getString("vc"))
                        .isNew(common.getString("is_new"))
                        .uvCt(uvCt)
                        .svCt(svCt)
                        .pvCt(1L)
                        .durSum(page.getLong("during_time"))
                        .ts(ts)
                        .sid(common.getString("sid"))
                        .build());

                /**
                 *   A
                 *   2024.11.19  10:00 -- > 2024.11.20  10:00(到期)      1
                 *
                 *
                 *   2024.11.20  9:00 ---->  2024.11.21  9:00                       1
                 *   2024.11.20  11:00                         1
                 *
                 */


            }
        });
    }

    private SingleOutputStreamOperator<JSONObject> etl(DataStreamSource<String> kafkaSource) {
        return kafkaSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                JSONObject common = jsonObject.getJSONObject("common");
                JSONObject page = jsonObject.getJSONObject("page");
                String ts = jsonObject.getString("ts");
                if (common != null && page != null) {
                    String mid = common.getString("mid");
                    if (mid != null && ts != null) {
                        collector.collect(jsonObject);
                    }
                }
            }
        });
    }
}